[FLINK-40040][cdc-runtime] Wrap coordinator failure in SerializedThrowable across the RPC boundary#4456
Open
Zhile wants to merge 1 commit into
Open
[FLINK-40040][cdc-runtime] Wrap coordinator failure in SerializedThrowable across the RPC boundary#4456Zhile wants to merge 1 commit into
Zhile wants to merge 1 commit into
Conversation
…wable across RPC boundary SchemaRegistry.failJob now wraps the failure cause into a SerializedThrowable before it crosses the operator-coordinator RPC boundary, and runInEventLoop routes through failJob so all exits share the wrapping. Otherwise an exception whose class only lives in the user classloader (e.g. the MySQL driver's com.mysql.cj.exceptions.ConnectionIsClosedException raised during table discovery) fails to deserialize on the receiving side, where flink-rpc-akka uses an isolated classloader, with ClassNotFoundException. The coordination response is then lost and the SchemaOperator request stalls until rpcTimeout, failing with a misleading TimeoutException and turning a transient error into a restart loop that only a full job restart recovers from.
Member
|
Could you please check the failing test cases? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Schema-evolution coordination responses are dropped when the failure's cause is a class that only lives in the user classloader (e.g.
com.mysql.cj.exceptions.ConnectionIsClosedException).flink-rpc-akkadeserializes the coordination response with an isolated classloader and throwsClassNotFoundException, so the response never reachesSchemaOperator, which then blocks onresponseFuture.get(rpcTimeout)and fails with a misleadingTimeoutException-- turning a transient error into a restart loop that only a full job restart recovers from.Observed in production (Flink CDC 3.6.0 on Flink 1.20.3):
JIRA: https://issues.apache.org/jira/browse/FLINK-40040
Brief change log
SchemaRegistry.failJobnow wraps the cause intoSerializedThrowable(idempotent, not re-wrapped if already one) before callinghandleUnrecoverableError(which completes pending coordination responses exceptionally) andcontext.failJob.SerializedThrowablecarries the original exception as bytes plus a stringified stack trace, so the receiving side can deserialize it without the original class and still see the real cause.SchemaRegistry.runInEventLoop's catch block now routes throughfailJobinstead of duplicating the fail path, so every exit shares the wrapping.SchemaCoordinator, since both complete pending response futures via the sharedfailJob->handleUnrecoverableErrorpath.Verifying this change
Added
SchemaRegistryFailJobSerializationTest:failJobWrapsExceptionIntoSerializedThrowableasserts the failure delivered to the coordinator context is aSerializedThrowable.wrappedFailureSurvivesIsolatedClassloaderButRawOneDoesNotshows a raw exception whose cause is a classloader-isolated type fails to deserialize (ClassNotFoundException), while theSerializedThrowable-wrapped one deserializes and preserves the real cause text.Does this pull request potentially affect one of the following parts